package com.xueyuan.wata.daph.node.flink117.string.general.connector

import com.xueyuan.wata.daph.flink117.api.node.string.StringInput
import com.xueyuan.wata.daph.node.flink117.string.general.{GeneralInputConfig, GeneralNode}
import org.apache.commons.lang3.StringUtils

class GeneralInput extends StringInput with GeneralNode {
  override def in(): String = {
    val config = nodeConfig.asInstanceOf[GeneralInputConfig]
    val databaseName = config.databaseName
    val resultTableName = config.resultTableName
    val createSql = config.createSql
    val resultSql = config.resultSql

    if (!StringUtils.isEmpty(resultSql)) {
      // 设置SQL方言，使用catalog
      su(config)

      // 创建表
      createTable(databaseName, createSql, resultSql)

      // 执行查询语句，创建临时视图
      if (!StringUtils.isEmpty(resultSql)) {
        val table = tableEnv.sqlQuery(resultSql)
        tableEnv.createTemporaryView(resultTableName, table)
      }
    }

    // 传递表名
    resultTableName
  }

  override def getNodeConfigClass = classOf[GeneralInputConfig]
}
